{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Answer Key to the Data Wrangling with DataFrames Coding Quiz\n",
    "\n",
    "Helpful resources:\n",
    "http://spark.apache.org/docs/latest/api/python/pyspark.sql.html"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg\n",
    "from pyspark.sql.functions import sum as Fsum\n",
    "from pyspark.sql.window import Window\n",
    "from pyspark.sql.types import IntegerType"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 1) import any other libraries you might need\n",
    "# 2) instantiate a Spark session \n",
    "# 3) read in the data set located at the path \"data/sparkify_log_small.json\"\n",
    "# 4) write code to answer the quiz questions \n",
    "\n",
    "spark = SparkSession \\\n",
    "    .builder \\\n",
    "    .appName(\"Data Frames practice\") \\\n",
    "    .getOrCreate()\n",
    "\n",
    "df = spark.read.json(\"data/sparkify_log_small.json\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Question 1\n",
    "\n",
    "Which page did user id \"\" (empty string) NOT visit?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- artist: string (nullable = true)\n",
      " |-- auth: string (nullable = true)\n",
      " |-- firstName: string (nullable = true)\n",
      " |-- gender: string (nullable = true)\n",
      " |-- itemInSession: long (nullable = true)\n",
      " |-- lastName: string (nullable = true)\n",
      " |-- length: double (nullable = true)\n",
      " |-- level: string (nullable = true)\n",
      " |-- location: string (nullable = true)\n",
      " |-- method: string (nullable = true)\n",
      " |-- page: string (nullable = true)\n",
      " |-- registration: long (nullable = true)\n",
      " |-- sessionId: long (nullable = true)\n",
      " |-- song: string (nullable = true)\n",
      " |-- status: long (nullable = true)\n",
      " |-- ts: long (nullable = true)\n",
      " |-- userAgent: string (nullable = true)\n",
      " |-- userId: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Logout\n",
      "Error\n",
      "Submit Upgrade\n",
      "Submit Downgrade\n",
      "Save Settings\n",
      "Downgrade\n",
      "Settings\n",
      "Upgrade\n",
      "NextSong\n"
     ]
    }
   ],
   "source": [
    "# filter for users with blank user id\n",
    "blank_pages = df.filter(df.userId == '') \\\n",
    "    .select(col('page') \\\n",
    "    .alias('blank_pages')) \\\n",
    "    .dropDuplicates()\n",
    "\n",
    "# get a list of possible pages that could be visited\n",
    "all_pages = df.select('page').dropDuplicates()\n",
    "\n",
    "# find values in all_pages that are not in blank_pages\n",
    "# these are the pages that the blank user did not go to\n",
    "for row in set(all_pages.collect()) - set(blank_pages.collect()):\n",
    "    print(row.page)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Question 2 - Reflect\n",
    "\n",
    "What type of user does the empty string user id most likely refer to?\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Perhaps it represents users who have not signed up yet or who are signed out and are about to log in."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Question 3\n",
    "\n",
    "How many female users do we have in the data set?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "462"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.filter(df.gender == 'F') \\\n",
    "    .select('userId', 'gender') \\\n",
    "    .dropDuplicates() \\\n",
    "    .count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Question 4\n",
    "\n",
    "How many songs were played from the most played artist?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+-----------+\n",
      "|  Artist|Artistcount|\n",
      "+--------+-----------+\n",
      "|Coldplay|         83|\n",
      "+--------+-----------+\n",
      "only showing top 1 row\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter(df.page == 'NextSong') \\\n",
    "    .select('Artist') \\\n",
    "    .groupBy('Artist') \\\n",
    "    .agg({'Artist':'count'}) \\\n",
    "    .withColumnRenamed('count(Artist)', 'Artistcount') \\\n",
    "    .sort(desc('Artistcount')) \\\n",
    "    .show(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Question 5 (challenge)\n",
    "\n",
    "How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+\n",
      "|avg(count(period))|\n",
      "+------------------+\n",
      "| 6.898347107438017|\n",
      "+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# TODO: filter out 0 sum and max sum to get more exact answer\n",
    "\n",
    "function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())\n",
    "\n",
    "user_window = Window \\\n",
    "    .partitionBy('userID') \\\n",
    "    .orderBy(desc('ts')) \\\n",
    "    .rangeBetween(Window.unboundedPreceding, 0)\n",
    "\n",
    "cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \\\n",
    "    .select('userID', 'page', 'ts') \\\n",
    "    .withColumn('homevisit', function(col('page'))) \\\n",
    "    .withColumn('period', Fsum('homevisit').over(user_window))\n",
    "\n",
    "cusum.filter((cusum.page == 'NextSong')) \\\n",
    "    .groupBy('userID', 'period') \\\n",
    "    .agg({'period':'count'}) \\\n",
    "    .agg({'count(period)':'avg'}).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
